Skip to content

Conversation

@jakeloo
Copy link
Member

@jakeloo jakeloo commented Sep 3, 2025

Summary by CodeRabbit

  • New Features

    • Public diagnostics for poller status, processing and queued ranges.
    • Worker shutdown now supports graceful close.
    • Automatic cleanup of processed staging data.
  • Improvements

    • Publisher and committer start concurrently and stop based on targets (no interval-based delays).
    • Enhanced range-aware and aggregated logging; missing-data warnings include poller status and brief backoff.
    • Poll failures now surface at higher log level.
  • Refactor

    • Removed interval-based pacing and related configuration surface.
  • Tests

    • Several committer unit tests removed.

@zeet-co
Copy link

zeet-co bot commented Sep 3, 2025

We're building your pull request over on Zeet.
Click me for more info about your build and deployment.
Once built, this branch can be tested at: https://polygon-indexer-q05q-jl-committer-update.insight.zeet.app before merging 😉

@coderabbitai
Copy link

coderabbitai bot commented Sep 3, 2025

Caution

Review failed

The pull request is closed.

Walkthrough

Removed interval-based pacing from the Committer and related CLI/config; added Poller diagnostic APIs and raised certain poll error logs; Worker.Run now aggregates errors and returns nil on failures/empty results and Worker gained Close(); committer unit tests were deleted; WorkModeMonitor now uses hard-coded defaults.

Changes

Cohort / File(s) Summary of Changes
Orchestrator: Committer concurrency and pacing
internal/orchestrator/committer.go
Removed DEFAULT_COMMITTER_TRIGGER_INTERVAL and triggerIntervalMs. Start no longer derives/uses intervals; parallel mode launches publish and commit loops immediately. runCommitLoop/runPublishLoop signatures no longer accept an interval and removed interval-based sleeps; loops now terminate when reaching commitToBlock. fetchBlockData logs poller status via GetPollerStatus() and sleeps 500ms on missing data. Added cleanupProcessedStagingBlocks(ctx) and enhanced termination logs.
Orchestrator: Poller diagnostics and logging
internal/orchestrator/poller.go
Added exported, read-locked diagnostics: GetProcessingRanges(), GetQueuedRanges(), GetPollerStatus(). Elevated non-expected poll errors in processBatch from Debug to Error. Minor comment/formatting refinements; no core algorithm changes.
Worker: Run flow, logging, and shutdown
internal/worker/worker.go
processBatchWithRetry logs now include range. Run aggregates errors into a single summary, returns nil on aggregated errors or when no blocks were fetched, updates metrics only on successful fetch, and uses actual results range for logs. Added public Close() error to close archive source and shutdown worker gracefully.
Configs & CLI: removed interval/work-mode/failure-recoverer config
configs/config.go, cmd/root.go
Removed Interval from CommitterConfig; removed FailureRecovererConfig and WorkModeConfig types and their fields; removed CLI flags and Viper bindings for committer/workMode/failureRecoverer intervals. Added Main StorageMainConfig field to StorageConfig.
Orchestrator: Work mode monitor init
internal/orchestrator/work_mode_monitor.go
NewWorkModeMonitor now uses hard-coded defaults (DEFAULT_WORK_MODE_CHECK_INTERVAL, DEFAULT_LIVE_MODE_THRESHOLD) instead of reading config; corrected big.NewInt usage for threshold.
Tests: Committer unit tests removed
internal/orchestrator/committer_test.go
All tests and imports removed; file now contains only the package declaration.

Sequence Diagram(s)

sequenceDiagram
  autonumber
  participant Orchestrator
  participant Committer
  participant Publisher
  participant Poller
  participant Store

  Orchestrator->>Committer: Start(commitToBlock)
  par Parallel loops (no interval pacing)
    Committer->>Publisher: runPublishLoop()
    Committer->>Committer: runCommitLoop()
  end

  loop Publish until lastPublished == commitToBlock
    Publisher->>Poller: fetchBlockData(range)
    alt Data available
      Poller-->>Publisher: blocks
      Publisher->>Store: publish(blocks)
      Publisher-->>Committer: update lastPublished
    else Missing data
      Poller-->>Publisher: not found + GetPollerStatus()
      Note right of Publisher: sleep 500ms, retry
    end
  end

  loop Commit until lastCommitted == commitToBlock
    Committer->>Store: commit(next block)
    Store-->>Committer: ack
    Committer-->>Committer: update lastCommitted
  end

  Committer-->>Orchestrator: Completed to commitToBlock
Loading
sequenceDiagram
  autonumber
  participant Caller
  participant Worker
  participant Source
  participant Metrics

  Caller->>Worker: Run(requestedRange)
  Worker->>Source: fetch batch(requestedRange)
  alt Any errors in results
    Worker-->>Caller: log aggregated error (requestedRange)
    Worker-->>Caller: return nil
  else No blocks fetched
    Worker-->>Caller: log "no blocks fetched" (requestedRange)
    Worker-->>Caller: return nil
  else Success
    Worker->>Metrics: update on success
    Worker-->>Caller: return results (actual results range)
  end

  Caller->>Worker: Close()
  alt Archive source exists
    Worker->>Source: Close()
  end
  Worker-->>Caller: shutdown complete
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~60 minutes


📜 Recent review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

💡 Knowledge Base configuration:

  • MCP integration is disabled by default for public repositories
  • Jira integration is disabled by default for public repositories
  • Linear integration is disabled by default for public repositories

You can enable these sources in your CodeRabbit configuration.

📥 Commits

Reviewing files that changed from the base of the PR and between dfa7c80 and 865c488.

📒 Files selected for processing (7)
  • cmd/root.go (0 hunks)
  • configs/config.go (1 hunks)
  • internal/orchestrator/committer.go (4 hunks)
  • internal/orchestrator/committer_test.go (0 hunks)
  • internal/orchestrator/poller.go (4 hunks)
  • internal/orchestrator/work_mode_monitor.go (1 hunks)
  • internal/worker/worker.go (2 hunks)
✨ Finishing Touches
  • 📝 Generate Docstrings
🧪 Generate unit tests
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch jl/committer-update

🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.

Support

Need help? Create a ticket on our support page for assistance with any issues or questions.

CodeRabbit Commands (Invoked using PR/Issue comments)

Type @coderabbitai help to get the list of available commands.

Other keywords and placeholders

  • Add @coderabbitai ignore or @coderabbit ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Status, Documentation and Community

  • Visit our Status Page to check the current availability of CodeRabbit.
  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (3)
internal/worker/worker.go (1)

123-135: Fix: failed retry results are being treated as successes (can panic downstream).

In processChunkWithRetry, an errored retry is stored in successMap and later appended to finalResults, so Poller.convertPollResultsToBlockData can dereference nil Data. Keep errors in a separate failed map and return them via failedResults.

Apply this diff:

-	// Separate successful and failed
-	successMap := make(map[string]rpc.GetFullBlockResult)
-	var failedBlocks []*big.Int
+	// Separate successful and failed
+	successMap := make(map[string]rpc.GetFullBlockResult)
+	failedMap := make(map[string]rpc.GetFullBlockResult)
+	var failedBlocks []*big.Int

 	for i, result := range results {
 		if i < len(chunk) {
 			if result.Error == nil {
 				successMap[chunk[i].String()] = result
 			} else {
-				failedBlocks = append(failedBlocks, chunk[i])
+				failedBlocks = append(failedBlocks, chunk[i])
+				failedMap[chunk[i].String()] = rpc.GetFullBlockResult{
+					BlockNumber: chunk[i],
+					Error:       result.Error,
+				}
 			}
 		}
 	}

 	// If only one block failed, retry once more
 	if len(failedBlocks) == 1 {
 		retryResults := fetchFunc(ctx, failedBlocks)
 		if len(retryResults) > 0 {
 			if retryResults[0].Error == nil {
 				successMap[failedBlocks[0].String()] = retryResults[0]
 			} else {
-				// Keep the error result
-				successMap[failedBlocks[0].String()] = rpc.GetFullBlockResult{
-					BlockNumber: failedBlocks[0],
-					Error:       retryResults[0].Error,
-				}
+				// Track as failed (do not add to successMap)
+				failedMap[failedBlocks[0].String()] = rpc.GetFullBlockResult{
+					BlockNumber: failedBlocks[0],
+					Error:       retryResults[0].Error,
+				}
 			}
+		} else {
+			failedMap[failedBlocks[0].String()] = rpc.GetFullBlockResult{
+				BlockNumber: failedBlocks[0],
+				Error:       fmt.Errorf("retry did not return result"),
+			}
 		}
 	} else if len(failedBlocks) > 1 {
@@
-		go func() {
+		go func() {
 			defer rwg.Done()
-			leftResults, _ := w.processChunkWithRetry(ctx, leftChunk, fetchFunc)
+			leftResults, leftFailed := w.processChunkWithRetry(ctx, leftChunk, fetchFunc)
 			// Add results to map
 			for _, r := range leftResults {
 				if r.BlockNumber != nil {
 					rwgMutex.Lock()
 					successMap[r.BlockNumber.String()] = r
 					rwgMutex.Unlock()
 				}
 			}
+			for _, fr := range leftFailed {
+				if fr.BlockNumber != nil {
+					rwgMutex.Lock()
+					failedMap[fr.BlockNumber.String()] = fr
+					rwgMutex.Unlock()
+				}
+			}
 		}()
@@
-		go func() {
+		go func() {
 			defer rwg.Done()
-			rightResults, _ := w.processChunkWithRetry(ctx, rightChunk, fetchFunc)
+			rightResults, rightFailed := w.processChunkWithRetry(ctx, rightChunk, fetchFunc)
 			// Add results to map
 			for _, r := range rightResults {
 				if r.BlockNumber != nil {
 					rwgMutex.Lock()
 					successMap[r.BlockNumber.String()] = r
 					rwgMutex.Unlock()
 				}
 			}
+			for _, fr := range rightFailed {
+				if fr.BlockNumber != nil {
+					rwgMutex.Lock()
+					failedMap[fr.BlockNumber.String()] = fr
+					rwgMutex.Unlock()
+				}
+			}
 		}()
@@
 	// Build final results in original order
 	var finalResults []rpc.GetFullBlockResult
 	var failedResults []rpc.GetFullBlockResult
 	for _, block := range chunk {
 		if result, ok := successMap[block.String()]; ok {
 			finalResults = append(finalResults, result)
+		} else if f, ok := failedMap[block.String()]; ok {
+			failedResults = append(failedResults, f)
 		} else {
 			// This should not happen as we have retried all failed blocks
 			failedResults = append(failedResults, rpc.GetFullBlockResult{
 				BlockNumber: block,
 				Error:       fmt.Errorf("failed to fetch block"),
 			})
 		}
 	}

Also applies to: 136-151, 162-190, 195-210

internal/orchestrator/committer.go (2)

191-217: Avoid tight loop when no blocks to commit.

Without interval pacing, this spins and floods logs when empty.

Apply this diff:

 			if len(blockDataToCommit) == 0 {
 				log.Debug().Msg("No block data to commit")
-				continue
+				time.Sleep(50 * time.Millisecond)
+				continue
 			}

219-236: Avoid tight publish loop.

Similarly, add a small idle backoff when there’s nothing to publish (implemented inside publish()).

Apply this diff in publish():

 	if len(blockData) == 0 {
-		return nil
+		time.Sleep(50 * time.Millisecond)
+		return nil
 	}
🧹 Nitpick comments (4)
internal/worker/worker.go (1)

441-449: Close staging source as well.

Worker.Close only closes archive; staging remains open.

Apply this diff:

 func (w *Worker) Close() error {
 	// Close archive if it exists
 	if w.archive != nil {
 		log.Debug().Msg("Closing archive connection")
 		w.archive.Close()
 	}
+	// Close staging if it exists
+	if w.staging != nil {
+		log.Debug().Msg("Closing staging connection")
+		w.staging.Close()
+	}

 	log.Debug().Msg("Worker closed successfully")
 	return nil
 }
internal/orchestrator/poller.go (2)

437-461: Comment says “with a timeout” but no timeout is implemented. Add one or fix the comment.

Currently waits only for completion or ctx cancel, which can block indefinitely.

Apply this diff to add a 15s timeout:

-	// Wait for the range to complete, timeout, or context cancellation
+	// Wait for the range to complete, timeout, or context cancellation
 	select {
 	case <-waitChan:
 		log.Debug().Msgf("Got notification for range %s processing completed", rangeKey)
 		return true // Range completed
+	case <-time.After(15 * time.Second):
+		log.Warn().Msgf("Timeout waiting for range %s to complete", rangeKey)
+		return false // Timed out
 	case <-p.ctx.Done():
 		return false // Context cancelled
 	}

279-285: Ensure worker resources are released during shutdown.

Call Worker.Close() to clean up archive/staging connections.

Apply this diff:

 func (p *Poller) shutdown() {
 	p.cancel()
 	close(p.tasks)
 	p.wg.Wait()
+	if p.worker != nil {
+		_ = p.worker.Close()
+	}
 	log.Info().Msg("Poller shutting down")
 }
internal/orchestrator/committer.go (1)

153-155: Typo in comment.

“acess” → “access”.

Apply this diff:

-	// Store in memory for quick acess
+	// Store in memory for quick access
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

💡 Knowledge Base configuration:

  • MCP integration is disabled by default for public repositories
  • Jira integration is disabled by default for public repositories
  • Linear integration is disabled by default for public repositories

You can enable these sources in your CodeRabbit configuration.

📥 Commits

Reviewing files that changed from the base of the PR and between e9f7efc and a10d7b0.

📒 Files selected for processing (3)
  • internal/orchestrator/committer.go (3 hunks)
  • internal/orchestrator/poller.go (4 hunks)
  • internal/worker/worker.go (2 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
internal/worker/worker.go (1)
internal/metrics/metrics.go (1)
  • LastFetchedBlock (37-40)
🪛 GitHub Actions: Go Code Format Check
internal/orchestrator/poller.go

[error] 1-1: gofmt check failed. Unformatted file: internal/orchestrator/poller.go. Run 'gofmt -w internal/orchestrator/poller.go' to fix. Command used: gofmt -l .

internal/orchestrator/committer.go

[error] 1-1: gofmt check failed. Unformatted file: internal/orchestrator/committer.go. Run 'gofmt -w internal/orchestrator/committer.go' to fix. Command used: gofmt -l .

⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
  • GitHub Check: Test
🔇 Additional comments (4)
internal/orchestrator/poller.go (3)

311-312: Log level escalation on poll failures looks good.

Error-level logging for real failures on a specific range improves observability.


463-501: Diagnostic getters are concurrency-safe and useful. LGTM.

Read-locking and snapshotting state for status export looks correct.


1-1: Apply gofmt formatting
CI flagged formatting errors in internal/orchestrator/poller.go—run

gofmt -w internal/orchestrator/poller.go

and commit the changes to unblock the pipeline.

internal/orchestrator/committer.go (1)

1-1: Format with gofmt
Run gofmt -w internal/orchestrator/committer.go to resolve the CI formatting failure.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (4)
internal/orchestrator/poller.go (2)

123-131: Fix race: atomically check-and-mark processing to prevent double polling

Between isRangeProcessing and markRangeAsProcessing there’s a TOCTOU race; two goroutines can poll the same range concurrently. Do the check-and-set under one lock.

Apply:

-	// Check if already processing
-	if p.isRangeProcessing(rangeKey) {
-		return nil, ErrBlocksProcessing
-	}
-
-	p.markRangeAsProcessing(rangeKey)
-	defer p.unmarkRangeAsProcessing(rangeKey)
+	// Atomically check-and-mark as processing
+	if !p.tryMarkRangeAsProcessing(rangeKey) {
+		return nil, ErrBlocksProcessing
+	}
+	defer p.unmarkRangeAsProcessing(rangeKey)

And add:

+// tryMarkRangeAsProcessing atomically marks a range as processing.
+func (p *Poller) tryMarkRangeAsProcessing(rangeKey string) bool {
+	p.processingRangesMutex.Lock()
+	defer p.processingRangesMutex.Unlock()
+	if _, exists := p.processingRanges[rangeKey]; exists {
+		return false
+	}
+	// Presence of key denotes "processing"; waiters slice starts empty.
+	p.processingRanges[rangeKey] = []chan struct{}{}
+	return true
+}

131-134: Return the sentinel ErrNoNewBlocks so callers handle it correctly

poll() currently returns a generic error for “no valid block data,” but callers (Request/processBatch) special-case ErrNoNewBlocks. This causes unnecessary error logs.

-	if len(blockData) == 0 || highestBlockNumber == nil {
-		return nil, fmt.Errorf("no valid block data polled")
-	}
+	if len(blockData) == 0 || highestBlockNumber == nil {
+		return nil, ErrNoNewBlocks
+	}
internal/orchestrator/committer.go (2)

191-217: Avoid busy-looping when there’s nothing to commit

With interval pacing removed, this loop will spin and log constantly if no data is ready.

 			if len(blockDataToCommit) == 0 {
-				log.Debug().Msg("No block data to commit")
-				continue
+				log.Debug().Msg("No block data to commit")
+				time.Sleep(100 * time.Millisecond) // small backoff to avoid hot loop
+				continue
 			}

Optionally make the backoff configurable.


219-236: Publish loop can spin; backoff when no progress

publish() returns nil even when no blocks were published; add a progress check and brief sleep.

 func (c *Committer) runPublishLoop(ctx context.Context) {
 	for {
 		select {
 		case <-ctx.Done():
 			return
 		default:
-			if c.commitToBlock.Sign() > 0 && c.lastPublishedBlock.Load() >= c.commitToBlock.Uint64() {
+			if c.commitToBlock.Sign() > 0 && c.lastPublishedBlock.Load() >= c.commitToBlock.Uint64() {
 				// Completing the publish loop if we've published more than commit to block
 				log.Info().Msgf("Committer reached configured toBlock %s, the last publish block is %d, stopping publishes", c.commitToBlock.String(), c.lastPublishedBlock.Load())
 				return
 			}
-			if err := c.publish(ctx); err != nil {
+			before := c.lastPublishedBlock.Load()
+			if err := c.publish(ctx); err != nil {
 				log.Error().Err(err).Msg("Error publishing blocks")
 			}
+			// Backoff if no progress
+			if c.lastPublishedBlock.Load() == before {
+				time.Sleep(100 * time.Millisecond)
+			}
 			go c.cleanupProcessedStagingBlocks(ctx)
 		}
 	}
 }
🧹 Nitpick comments (8)
internal/orchestrator/poller.go (5)

231-242: Defensive nil checks to avoid potential panic on partial worker results

If worker aggregates errors and returns partials, result.Data or nested fields might be nil.

-	for _, result := range results {
-		blockData = append(blockData, common.BlockData{
-			Block:        result.Data.Block,
-			Logs:         result.Data.Logs,
-			Transactions: result.Data.Transactions,
-			Traces:       result.Data.Traces,
-		})
-	}
+	for _, result := range results {
+		if result.Data == nil || result.Data.Block == nil {
+			continue
+		}
+		blockData = append(blockData, common.BlockData{
+			Block:        result.Data.Block,
+			Logs:         result.Data.Logs,
+			Transactions: result.Data.Transactions,
+			Traces:       result.Data.Traces,
+		})
+	}

If GetFullBlockResult guarantees non-nil Data, ignore this suggestion.


279-284: Close worker to release resources during shutdown

Worker got a Close(); poller should call it.

 func (p *Poller) shutdown() {
 	p.cancel()
 	close(p.tasks)
 	p.wg.Wait()
+	if p.worker != nil {
+		p.worker.Close()
+	}
 	log.Info().Msg("Poller shutting down")
 }

311-312: Error level may be noisy; consider sampling

Now that these are Error-level, transient RPC flaps may flood logs. Consider sampling or downgrading when error is expected-ish.


437-461: Docstring mentions timeout, but code has none

The function waits until completion or context cancel; no timeout is implemented.

-// waitForRange waits for a range to finish processing with a timeout
+// waitForRange waits for a range to finish processing, or until the poller context is canceled.
@@
-	// Wait for the range to complete, timeout, or context cancellation
+	// Wait for completion or context cancellation

463-501: Diagnostics API is helpful; consider a typed struct for stability

Returning map[string]interface{} hampers evolvability and typing at call sites. Optional, non-blocking.

internal/orchestrator/committer.go (3)

389-391: Use context-aware wait for missing range and make delay configurable

Hard-coded sleep risks stalling shutdown and makes tuning hard.

-		log.Warn().Msgf("Committer didn't find the following range: %v - %v. %v", blockNumbers[0].Int64(), blockNumbers[len(blockNumbers)-1].Int64(), c.poller.GetPollerStatus())
-		time.Sleep(500 * time.Millisecond) // TODO: wait for block time
+		log.Warn().Msgf("Committer didn't find range: %v - %v. %v", blockNumbers[0].Int64(), blockNumbers[len(blockNumbers)-1].Int64(), c.poller.GetPollerStatus())
+		select {
+		case <-ctx.Done():
+			return nil, ctx.Err()
+		case <-time.After(500 * time.Millisecond): // consider making this configurable
+		}

153-165: Minor typo

“acess” -> “access”.

-	// Store in memory for quick acess
+	// Store in memory for quick access

505-523: Fire-and-forget publish in default mode: OK but consider error surface

Non-blocking publish is fine; if publish fails repeatedly, consider a bounded retry queue to avoid losing lastPublished progression.

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

💡 Knowledge Base configuration:

  • MCP integration is disabled by default for public repositories
  • Jira integration is disabled by default for public repositories
  • Linear integration is disabled by default for public repositories

You can enable these sources in your CodeRabbit configuration.

📥 Commits

Reviewing files that changed from the base of the PR and between a10d7b0 and 60928eb.

📒 Files selected for processing (3)
  • internal/orchestrator/committer.go (4 hunks)
  • internal/orchestrator/committer_test.go (0 hunks)
  • internal/orchestrator/poller.go (4 hunks)
💤 Files with no reviewable changes (1)
  • internal/orchestrator/committer_test.go
🧰 Additional context used
🧬 Code graph analysis (1)
internal/orchestrator/committer.go (3)
internal/publisher/publisher.go (1)
  • GetInstance (37-45)
internal/orchestrator/validator.go (1)
  • NewValidator (23-29)
internal/worker/worker.go (1)
  • NewWorker (47-57)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
  • GitHub Check: Test
🔇 Additional comments (3)
internal/orchestrator/committer.go (3)

166-183: Parallel start LGTM

Concurrent start for publish/commit is fine and simpler.


386-394: Good: richer diagnostics on missing ranges

Including poller status will help root-cause stalls.


451-459: Reuse sequential path for commit/publish LGTM

The split into getSequentialBlockData{ToCommit,ToPublish} is clean.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (4)
internal/orchestrator/work_mode_monitor.go (3)

49-63: Fix panic on send to closed channel + avoid holding lock while sending.

RegisterChannel can panic if ch is closed, and currently holds the write lock during send. Snapshot mode under lock, unlock, then do a safe non-blocking send with recover; also remove closed channels.

Apply this diff:

 func (m *WorkModeMonitor) RegisterChannel(ch chan WorkMode) {
-	m.channelsMutex.Lock()
-	defer m.channelsMutex.Unlock()
-
-	m.workModeChannels[ch] = struct{}{}
-	// Send current mode to the new channel only if it's not empty
-	if m.currentMode != "" {
-		select {
-		case ch <- m.currentMode:
-			log.Debug().Msg("Initial work mode sent to new channel")
-		default:
-			log.Warn().Msg("Failed to send initial work mode to new channel - channel full")
-		}
-	}
+	// Add channel under lock, snapshot current mode, then release
+	m.channelsMutex.Lock()
+	m.workModeChannels[ch] = struct{}{}
+	mode := m.currentMode
+	m.channelsMutex.Unlock()
+
+	// Send initial mode (non-blocking) outside lock; tolerate closed channels
+	if mode != "" {
+		func() {
+			defer func() {
+				if r := recover(); r != nil {
+					log.Warn().Msg("Failed to send initial work mode - channel closed; unregistering")
+					m.channelsMutex.Lock()
+					delete(m.workModeChannels, ch)
+					m.channelsMutex.Unlock()
+				}
+			}()
+			select {
+			case ch <- mode:
+				log.Debug().Msg("Initial work mode sent to new channel")
+			default:
+				log.Warn().Msg("Failed to send initial work mode to new channel - channel full")
+			}
+		}()
+	}
 }

81-118: Guard currentMode writes to avoid data races.

currentMode is read from RegisterChannel; protect writes here with the same mutex.

Apply this diff:

-		log.Info().Msgf("Work mode changing from %s to %s during startup", m.currentMode, newMode)
-		m.currentMode = newMode
+		log.Info().Msgf("Work mode changing from %s to %s during startup", m.currentMode, newMode)
+		m.channelsMutex.Lock()
+		m.currentMode = newMode
+		m.channelsMutex.Unlock()
 		m.updateWorkModeMetric(newMode)
 		m.broadcastWorkMode(newMode)
@@
-			if newMode != m.currentMode {
-				log.Info().Msgf("Work mode changing from %s to %s", m.currentMode, newMode)
-				m.currentMode = newMode
+			if newMode != m.currentMode {
+				log.Info().Msgf("Work mode changing from %s to %s", m.currentMode, newMode)
+				m.channelsMutex.Lock()
+				m.currentMode = newMode
+				m.channelsMutex.Unlock()
 				m.updateWorkModeMetric(newMode)
 				m.broadcastWorkMode(newMode)
 			}

120-135: Fix unsafe recover and illegal map mutation under RLock in broadcast.

recover in default branch never triggers; deleting from map while RLock held is unsafe. Snapshot channels, send with recover, then delete closed channels under write lock.

Apply this diff:

 func (m *WorkModeMonitor) broadcastWorkMode(mode WorkMode) {
-	m.channelsMutex.RLock()
-	defer m.channelsMutex.RUnlock()
-
-	for ch := range m.workModeChannels {
-		select {
-		case ch <- mode:
-			log.Debug().Msg("Work mode change notification sent")
-		default:
-			if r := recover(); r != nil {
-				log.Warn().Msg("Work mode notification dropped - channel closed")
-				delete(m.workModeChannels, ch)
-			}
-		}
-	}
+	// Snapshot channels to avoid holding write lock while sending
+	m.channelsMutex.RLock()
+	channels := make([]chan WorkMode, 0, len(m.workModeChannels))
+	for ch := range m.workModeChannels {
+		channels = append(channels, ch)
+	}
+	m.channelsMutex.RUnlock()
+
+	var toRemove []chan WorkMode
+	for _, ch := range channels {
+		closed := false
+		func() {
+			defer func() {
+				if r := recover(); r != nil {
+					closed = true
+				}
+			}()
+			select {
+			case ch <- mode:
+				log.Debug().Msg("Work mode change notification sent")
+			default:
+				log.Warn().Msg("Work mode notification dropped - channel full")
+			}
+		}()
+		if closed {
+			toRemove = append(toRemove, ch)
+		}
+	}
+
+	if len(toRemove) > 0 {
+		m.channelsMutex.Lock()
+		for _, ch := range toRemove {
+			delete(m.workModeChannels, ch)
+		}
+		m.channelsMutex.Unlock()
+		log.Warn().Int("removed", len(toRemove)).Msg("Removed closed work mode channels")
+	}
 }
configs/config.go (1)

343-351: Bug: Orchestrator chainBasedConfig writes to Main instead of Orchestrator.

This overwrites the wrong target; use Storage.Orchestrator.Clickhouse.

Apply this diff:

-	if Cfg.Storage.Main.Clickhouse != nil {
-		Cfg.Storage.Main.Clickhouse.ChainBasedConfig = orchestratorChainConfig
-	}
+	if Cfg.Storage.Orchestrator.Clickhouse != nil {
+		Cfg.Storage.Orchestrator.Clickhouse.ChainBasedConfig = orchestratorChainConfig
+	}
🧹 Nitpick comments (3)
internal/orchestrator/work_mode_monitor.go (2)

153-160: Clamp negative diffs and avoid int64 overflow in logs.

Reorgs or clock skew can yield negative diffs; logging via Int64 may overflow. Clamp to zero and log as string.

Apply this diff:

-	blockDiff := new(big.Int).Sub(latestBlock, lastCommittedBlock)
-	log.Debug().Msgf("Committer is %d blocks behind the chain", blockDiff.Int64())
+	blockDiff := new(big.Int).Sub(latestBlock, lastCommittedBlock)
+	if blockDiff.Sign() < 0 {
+		blockDiff = big.NewInt(0)
+	}
+	log.Debug().Msgf("Committer is %s blocks behind the chain", blockDiff.String())
 	if blockDiff.Cmp(m.liveModeThreshold) < 0 {
 		return WorkModeLive, nil
 	}

34-46: Consider env/flag overrides for hard-coded defaults.

Operationally handy to allow overrides (env or flags) even if config keys were removed.

configs/config.go (1)

314-319: Nit: duplicate payload in structured log + Msgf.

Use structured logging without Msgf formatting noise.

Apply this diff:

-		log.Debug().
-			Interface("chainConfig", clickhouse.ChainBasedConfig).
-			Msgf("Loaded chain config %v", clickhouse.ChainBasedConfig)
+		log.Debug().
+			Interface("chainConfig", clickhouse.ChainBasedConfig).
+			Msg("Loaded chain config")
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

💡 Knowledge Base configuration:

  • MCP integration is disabled by default for public repositories
  • Jira integration is disabled by default for public repositories
  • Linear integration is disabled by default for public repositories

You can enable these sources in your CodeRabbit configuration.

📥 Commits

Reviewing files that changed from the base of the PR and between 60928eb and dfa7c80.

📒 Files selected for processing (3)
  • cmd/root.go (0 hunks)
  • configs/config.go (1 hunks)
  • internal/orchestrator/work_mode_monitor.go (1 hunks)
💤 Files with no reviewable changes (1)
  • cmd/root.go
🧰 Additional context used
🧬 Code graph analysis (1)
configs/config.go (3)
internal/orchestrator/poller.go (1)
  • Poller (23-40)
internal/orchestrator/committer.go (1)
  • Committer (24-35)
internal/orchestrator/reorg_handler.go (1)
  • ReorgHandler (21-29)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
  • GitHub Check: Test
🔇 Additional comments (1)
internal/orchestrator/work_mode_monitor.go (1)

34-46: Good: default init + int64 fix for big.NewInt.

Hard-coded defaults are OK for now; the int64 cast fixes the previous bug.

@jakeloo jakeloo force-pushed the jl/committer-update branch from dfa7c80 to 865c488 Compare September 3, 2025 15:20
@jakeloo jakeloo merged commit 2914e3f into main Sep 3, 2025
4 of 5 checks passed
@jakeloo jakeloo deleted the jl/committer-update branch September 3, 2025 15:21
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants